package net.quanter.shield.mq.rocketmq.producer;


import com.alibaba.fastjson2.JSON;
import lombok.extern.slf4j.Slf4j;
import net.quanter.shield.common.dto.result.ResultDTO;
import net.quanter.shield.mq.MQMessageVO;
import net.quanter.shield.mq.MQProducer;
import net.quanter.shield.mq.rocketmq.param.RocketMQBorkerParam;
import net.quanter.shield.mq.rocketmq.param.RocketMQTopicParam;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.client.producer.SendStatus;
import org.apache.rocketmq.common.message.Message;

import java.util.Map;

/**
 * 社区版RocketMQ发送者服务
 */
@Slf4j
public class RocketMQCommunityProducer<T> implements MQProducer<T, Message> {

    private final RocketMQBorkerParam mqBorkerParam;
    private final DefaultMQProducer producer;
    private final RocketMQTopicParam topic;

    public RocketMQCommunityProducer(
            RocketMQBorkerParam mqBorkerParam, RocketMQTopicParam topic, String producerGroup) throws MQClientException {
        this.mqBorkerParam = mqBorkerParam;
        this.topic = topic;
        producer = new DefaultMQProducer(producerGroup);
        producer.setNamesrvAddr(mqBorkerParam.getEndPoint());
        producer.start();
    }

    @Override
    public Message getSourceMessageFromMQMessage(MQMessageVO<T> mqMessageVO) {
        Message message = new Message();
        message.setBody(mqMessageVO.getBase64Obj());
        message.setTopic(mqMessageVO.getTopic());
        message.setTags(mqMessageVO.getTag());
        message.setKeys(mqMessageVO.getMessageId());
        for (Map.Entry<String, String> entry : mqMessageVO.getProperties().entrySet()) {
            message.putUserProperty(entry.getKey(), entry.getValue());
        }
        return message;
    }

    @Override
    public ResultDTO send(MQMessageVO mqMessageVO, Message message) {
        try {
            SendResult sendResult = producer.send(message);
            if (sendResult.getSendStatus() == SendStatus.SEND_OK) {
                mqMessageVO.setMessageId(sendResult.getMsgId());
                return ResultDTO.TRUE;
            } else {
                return ResultDTO.failure().message(sendResult.getSendStatus().name());
            }
        } catch (Exception e) {
            log.error("RocketMQCommunityProducer send error,mqMessageVO={}",
                    JSON.toJSON(mqMessageVO),e);
            return ResultDTO.failure().message(e.getMessage());
        }
    }

    @Override
    public void close() {
        producer.shutdown();
    }
}
